qlands / elasticfeeds Goto Github PK
View Code? Open in Web Editor NEWA Python library for managing feeds using ElasticSearch
License: Other
A Python library for managing feeds using ElasticSearch
License: Other
I'm not sure if this beyond scope of this - but possibly still ok -
I'm wiring up a lambda call - and it currently has a parameter -
https://website.com?q=testuser
but I'm wondering if we could write some smarts to help here
### Activity Service REST API
The API is abstract, and allows for any node in the graph to take the assumed role of actor, object, target, context etc. - This means that the direction in which an activity occured matters. For instance, supposing a youtube video could favorite something, the activity would then be (actor:youtube_video)-FAVORITED->(object:special_something). Asking the API about activities that the youtube video has done means placing the youtube video in the context of an actor. Whereas asking about activities that have been done on the youtube video means placing the youtube video in the context of an object.
#### Actor Context
Get all nodes of type
'get /api/v1/actor/:actor' --> /api/v1/actor/youtube_user
Get node of specfic id
'get /api/v1/actor/:actor/:actor_id' --> /api/v1/actor/youtube_user/1
Get all activites of specifc actor
'get /api/v1/actor/:actor/:actor_id/activities' -> /api/v1/actor/youtube_user/1/activities
Get all specific verbed activites of user
'get /api/v1/actor/:actor/:actor_id/:verb' -> /api/v1/actor/youtube_user/1/FAVORITED
Getall activies verb by type of object by user
'get /api/v1/actor/:actor/:actor_id/:verb/:object' -> api/v1/actor/youtube_user/1/FAVORITED/flickr_photo
Get specific activity with user verbed object
'get /api/v1/actor/:actor/:actor_id/:verb/:object/:object_id' -> api/v1/actor/youtube_user/FAVORITED/flickr_photo/1212
UPDATE
create / delete
Post an Activity
'post /api/v1/activity':
{
actor: {
aid: <string>,
type: <appname_model>,
api: <api url>
},
verb: <string>,
object: {
aid: <string>,
type: <appname_model>,
api: <api url>
}
}
Delete an Activity
'delete /api/v1/activity/:actor/:actor_id/:verb/:object/:object_id' ->
api/v1/youtube_user/1/FAVORITED/flickr_photo/14442
https://opensearch.org/docs/latest/clients/python-low-level/
Going to attempt to swift over the base elasticsearch over to opensearch-py
UPDATE
good news - switched these lines and everything still works.
manager.py
from opensearchpy import OpenSearch
from opensearchpy.exceptions import RequestError
https://gist.github.com/johndpope/0f77156bfc1d43bd0c23df0d34d86b01
tests seem to be passing fine.
now to attempt to wire up to opensearch end point.
FYI - worth a read regarding elasticsearch 8 + licenses.
https://aws.amazon.com/what-is/opensearch/
I'm at a point where I don't know how / why things work. or how I think things should work - are not aligning.
I have some superficial questions on code / tests.
here goes - thanks in advance for any insights.
Question is)
where is edoquiros in test_00_manager?
https://github.com/qlands/elasticfeeds/blob/master/elasticfeeds/tests/test_00_manager.py#L67C36-L67C45
because you're using "cquiros" it is describing linkedActivity / activity_id / Link / cquiros - can you help unpack this?
# Creates a linked activity
# tst_linked_activity = LinkedActivity("cquiros") ## q) is this supposed to be edoquiros?
tst_linked_activity = LinkedActivity(activity_id=str(uuid.uuid4())) # q) if it is really "cquiros" - then it seems it follows itself.
tst_linked_activity.activity_class = "actor"
tst_linked_activity.activity_type = "person"
# Creates a link
tst_link = Link(actor_id="cquiros",linked_activity=tst_linked_activity)# UPDATE ) digging through code - maybe it's better to spell this actor_idout....
tst_link.linked = now
tst_link.link_type = "follow"
tst_link.link_weight = 1
tst_link.extra = {"some_extra_data": "test"}
# Adds the network link
tst_manager.add_network_link(tst_link)
# Carlos follow Eduardo. Test of convenience function Q) is this Carlos follows back Eduardo?
tst_manager.follow("cquiros", "edoquiros", now) # Q) is there no referential integrity? where does edoquiros come from?
BACKGROUND
I successfully create the indexes
tst_manager = Manager(
feed_index = "testfeeds",
network_index = "testnetwork",
byo_connection = True
)
tst_manager.assign_connection(opensearch)
In another process / importing liked youtube videos
this successfully adds the activity / it's basically - a user "Blabla" liked a video - and the extra field contains youtube content.
✅ - this is working
tst_actor = Actor(actor_id, "person",extra={"url":"imagelink"})
# Creates an object
# video_id = dict['video_id']
# title = dict['title']
# description = dict['description']
# url = dict['url']
tst_object = Object(video_id, "video")
# Creates an Activity
tst_activity = Activity(
activity_type="liked", activity_actor=tst_actor,activity_object=tst_object, published=now + datetime.timedelta(minutes=12),extra={"url":video_url,"description":description,"title":title,"vid":vid}
)
# Adds the activity
tst_manager.add_activity_feed(tst_activity)
and I get the following json using this query
GET _search
{
"query": {
"match_all": {}
}
}
✅ - this is working
{
"_index": "testfeeds",
"_id": "ea2839ae-8cdf-4be8-b928-7914941385d9",
"_score": 1,
"_source": {
"published": "2023-08-23T08:17:33.350965",
"published_date": "2023-08-23",
"published_time": "08:17:33",
"published_year": 2023,
"published_month": 8,
"actor": {
"id": "blabla",
"type": "person",
"extra": {
"url": "imagelink"
}
},
"type": "liked",
"object": {
"id": "OuzUyrcvTg4",
"type": "video"
},
"extra": {
"url": "https://www.youtube.com/watch?v=OuzUyrcvTg4",
"description": """The impossibblablalblalblba.""",
"title": "Pilot Tries Impossible Turn!",
"vid": {
"kind": "youtube#video",
"etag": "E3ofbOEgXPR0rsC3xav4H2b88Zk",
"id": "OuzUyrcvTg4",
"snippet": {
"publishedAt": "2023-08-05T15:03:39Z",
"channelId": "UCBeZYVlqOeSSlrBSXl4aTig",
"title": "Pilot Tries Impossible Turn!",
"description": """The imposblablablalballab.""",
"thumbnails": {
"default": {
"url": "https://i.ytimg.com/vi/OuzUyrcvTg4/default.jpg",
"width": 120,
"height": 90
},
"medium": {
"url": "https://i.ytimg.com/vi/OuzUyrcvTg4/mqdefault.jpg",
"width": 320,
"height": 180
},
"high": {
"url": "https://i.ytimg.com/vi/OuzUyrcvTg4/hqdefault.jpg",
"width": 480,
"height": 360
},
"standard": {
"url": "https://i.ytimg.com/vi/OuzUyrcvTg4/sddefault.jpg",
"width": 640,
"height": 480
},
"maxres": {
"url": "https://i.ytimg.com/vi/OuzUyrcvTg4/maxresdefault.jpg",
"width": 1280,
"height": 720
}
},
"channelTitle": "Pilot Debrief",
"tags": [
"impossible turn",
"engine failure on takeoff",
"airplane stalls",
"plane loses engine on takeoff",
"plane stalls after takeoff",
"plane stalls and crashes",
"plane engine stalls",
"airplane crash",
"engine failure",
"airplane crash landing",
"airplane crash investigation",
"aviation",
"pilot",
"flying",
"Pilot debrief"
],
"categoryId": "22",
"liveBroadcastContent": "none",
"defaultLanguage": "en",
"localized": {
"title": "Pilot Tries Impossible Turn!",
"description": """The ibla babllabllablab """
},
"defaultAudioLanguage": "en"
}
}
}
}
},
BUT -
now as another user - "johnpope"
I want to follow user id "blabla"
I use the helper.
tst_manager.follow(my_actor_id,following_actor_id)
and now I want to see the feeds for johnpope - (like facebook feeds / friends did posts / liked stuff etc)
this api is simply using the aggregator
tst_date_weight_aggregator = DateWeightAggregator("cquiros") # is equal to the q str in uri
test = tst_manager.get_feeds(tst_date_weight_aggregator)
print("test:",test)
do I need something else?
I dug into the other tests - and see this.
https://github.com/qlands/elasticfeeds/blob/master/elasticfeeds/tests/test_02_aggregators.py#L37C3-L39C54
here - would substituting "cquiros" with "johnpope" basically be the equivalent of viewing the feeds for johnpope? That would contain
# Test recent object type aggregator
tst_date_weight_aggregator = DateWeightAggregator("cquiros")
tst_manager.get_feeds(tst_date_weight_aggregator)
do I want to target the feeds or network here for a feed for johnpope (which would just show "blabla" user?
maybe with the DateWeightAggregator we could make show the variable name -
UPDATE
tst_date_weight_aggregator = DateWeightAggregator(actor_id="cquiros")
tst_manager.get_feeds(tst_date_weight_aggregator)
UPDATE
for sanity test -
GET _search
{
"size": 10,
"query": {
"match": {
"_index": "testnetwork"
}
}
}
I can see the testnetwork
and I can see the uuid doesn't make sense - and I can easily delete it.
POST testnetwork/_delete_by_query
{
"size": 10,
"query": {
"match": {
"_id": "233b500c-432c-49e0-91d4-33290ea2b071"
}
}
}
tst_linked_activity = LinkedActivity(activity_id=str(uuid.uuid4())) // obviously wrong - must match the actor_id ??
this is the code
tst_manager = Manager(
feed_index = "testfeeds",
network_index = "testnetwork",
byo_connection = True
)
tst_manager.assign_connection(opensearch)
tst_manager.follow("johndpope","blabla") # just use this helper instead of sample code which incorrectly uses cquiros 2x. should follow- edoquiros
I looked at the dockercompose - got stuck - and ended up using
docker-elk (15,000 stars)
https://github.com/deviantony/docker-elk/blob/main/docker-compose.yml
this requires a setup / that initiates the passwords (somewhat mandatory in latest version?)
I upgraded the test_manager . but now I want to pass in the basic_auth -
it seems to support it
I attempt to extend manager to pass the basic_auth - but no joy.
https://gist.github.com/johndpope/07d03a03cf94d447ec3e61c405081385
but now the
def test_manager():
es_host = "0.0.0.0"
es_port = 9200
use_ssl = "False"
ready = False
host = "http://{}:{}/_cluster/health".format(es_host, es_port)
print("Waiting for ES to be ready check :",host)
session = requests.Session()
session.auth = ('elastic', 'changeme')
while not ready:
try:
if use_ssl == "False":
resp = session.get(
"http://{}:{}/_cluster/health".format(es_host, es_port)
)
else:
resp = session.get(
"https://{}:{}/_cluster/health".format(es_host, es_port)
)
data = resp.json()
print("json:",data)
if data["status"] == "yellow" or data["status"] == "green":
ready = True
else:
time.sleep(30)
except Exception as e:
print(str(e))
time.sleep(30)
print("ES is ready")
now = datetime.datetime.now()
tst_manager = Manager(
"testfeeds",
"testnetwork",
delete_network_if_exists=True,
delete_feeds_if_exists=True,
basic_auth=HTTPDigestAuth('elastic', 'changeme')
)
UPDATE
I got around this by disabling security
but now I run into a problem with v8 elastic search
(torch2) ➜ elasticfeeds git:(master) ✗ python elasticfeeds/tests/test_00_manager.py
Waiting for ES to be ready check : http://0.0.0.0:9200/_cluster/health
json: {'cluster_name': 'docker-cluster', 'status': 'green', 'timed_out': False, 'number_of_nodes': 1, 'number_of_data_nodes': 1, 'active_primary_shards': 0, 'active_shards': 0, 'relocating_shards': 0, 'initializing_shards': 0, 'unassigned_shards': 0, 'delayed_unassigned_shards': 0, 'number_of_pending_tasks': 0, 'number_of_in_flight_fetch': 0, 'task_max_waiting_in_queue_millis': 0, 'active_shards_percent_as_number': 100.0}
ES is ready
args: (<elasticsearch._sync.client.indices.IndicesClient object at 0x7f1bb3afa7a0>, 'feeds')
Traceback (most recent call last):
File "/home/oem/Documents/gitWorkspace/elasticfeeds/elasticfeeds/tests/test_00_manager.py", line 173, in
test_manager()
File "/home/oem/Documents/gitWorkspace/elasticfeeds/elasticfeeds/tests/test_00_manager.py", line 42, in test_manager
tst_manager = Manager(
File "/home/oem/Documents/gitWorkspace/elasticfeeds/elasticfeeds/manager/manager.py", line 278, in init
if not connection.indices.exists("feeds"):
File "/home/oem/miniconda3/envs/torch2/lib/python3.10/site-packages/elasticsearch/_sync/client/utils.py", line 308, in wrapped
raise TypeError(
TypeError: Positional arguments can't be used with Elasticsearch API methods. Instead only use keyword arguments.
Not sure why this regressed in v8.
New api
running
pip install elasticsearch==7.7.1
can see old api from version 7.
UPDATE
making progress - just needed to add index= to front of some methods.
https://gist.github.com/johndpope/33e6acec7505b3d7a539b0ad763f088b
https://github.com/opensearch-project/opensearch-js/blob/main/USER_GUIDE.md
Add a Document to the Index
console.log('Adding document:');
var document = {
title: 'The Outsider',
author: 'Stephen King',
year: '2018',
genre: 'Crime fiction',
};
var id = '1';
var response = await client.index({
id: id,
index: index_name,
body: document,
refresh: true,
});
console.log(response.body);
UPDATE
I attempt to port python to typescript using chatgpt
(maybe a waste of time)
class LinkedActivity {
constructor(
public activity_id: string,
public activity_class: string = "actor",
public activity_type: string = "person"
) {
const temp = activity_id.split(" ");
if (temp.length !== 1) {
throw new IDError(); // Assuming IDError is defined
}
if (!activity_class.match(/^[a-zA-Z]+$/)) {
throw new KeyWordError(activity_class); // Assuming KeyWordError is defined
}
if (activity_class !== "actor" && activity_class !== "object") {
throw new ActivityClassError(); // Assuming ActivityClassError is defined
}
if (!activity_type.match(/^[a-zA-Z]+$/)) {
throw new KeyWordError(activity_type); // Assuming KeyWordError is defined
}
}
get_dict(): Record<string, string> {
return {
activity_class: this.activity_class,
id: this.activity_id,
type: this.activity_type,
};
}
}
class Link {
constructor(
public actor_id: string,
public linked_activity: LinkedActivity,
public linked: Date = new Date(),
public link_type: string = "follow",
public link_weight: number = 1,
public extra: Record<string, any> | null = null
) {
const temp = actor_id.split(" ");
if (temp.length !== 1) {
throw new IDError(); // Assuming IDError is defined
}
if (!(linked instanceof Date)) {
throw new LinkedTypeError();
}
if (!link_type.match(/^[a-zA-Z]+$/)) {
throw new KeyWordError(link_type);
}
if (!(linked_activity instanceof LinkedActivity)) {
throw new LinkedActivityObjectError();
}
if (extra !== null && !isDictionary(extra)) {
throw new ExtraTypeError();
}
if (typeof link_weight !== "number") {
throw new WeightTypeError();
}
}
get_dict(): Record<string, any> {
const _dict: Record<string, any> = {
linked: this.linked,
actor_id: this.actor_id,
link_type: this.link_type,
linked_activity: this.linked_activity.get_dict(),
link_weight: this.link_weight,
};
if (this.extra !== null) {
_dict.extra = this.extra;
}
return _dict;
}
get_search_dict(): Record<string, any> {
const _dict: Record<string, any> = {
query: {
bool: {
must: [
{ term: { actor_id: this.actor_id } },
{ term: { link_type: this.link_type } },
{
term: {
"linked_activity.activity_class": this.linked_activity.activity_class,
},
},
{
term: {
"linked_activity.type": this.linked_activity.activity_type,
},
},
{
term: {
"linked_activity.id": this.linked_activity.activity_id,
},
},
],
},
},
};
return _dict;
}
}
function isDictionary(obj: any): boolean {
return typeof obj === "object" && obj !== null && !Array.isArray(obj);
}
def follow(
self,
actor_id,
following,
linked=datetime.datetime.now(),
activity_type="person",
):
"""
A convenience function to declare a follow link
:param actor_id: Actor ID who's link is being declared in the network
:param following: The person that is being followed
:param linked: Datetime of the link
:param activity_type: String. Single word. The type of feed component that is being followed or watched.
For example, if the class is "actor" then it's type could be "Person", "User" or "Member".
If the class is "object" then its type could be "Document", or "Project".
:return: None
"""
a_linked_activity = LinkedActivity(following, activity_type=activity_type)
a_link = Link(actor_id, a_linked_activity, linked=linked)
self.add_network_link(a_link)
https://github.com/GetStream/stream-python
I really just need just need a demonstration of this working
result = user_feed_1.get(limit=5, offset=5)
I can help flesh out the opensearch queries
import datetime
# Create a new client
# Create a feed object
user_feed_1 = client.feed('user', '1')
# Get activities from 5 to 10 (slow pagination)
result = user_feed_1.get(limit=5, offset=5)
# (Recommended & faster) Filter on an id less than the given UUID
result = user_feed_1.get(limit=5, id_lt="e561de8f-00f1-11e4-b400-0cc47a024be0")
# Create a new activity
activity_data = {'actor': 1, 'verb': 'tweet', 'object': 1, 'foreign_id': 'tweet:1'}
activity_response = user_feed_1.add_activity(activity_data)
# Create a bit more complex activity
activity_data = {'actor': 1, 'verb': 'run', 'object': 1, 'foreign_id': 'run:1',
'course': {'name': 'Golden Gate park', 'distance': 10},
'participants': ['Thierry', 'Tommaso'],
'started_at': datetime.datetime.now()
}
user_feed_1.add_activity(activity_data)
# Remove an activity by its id
user_feed_1.remove_activity("e561de8f-00f1-11e4-b400-0cc47a024be0")
# or by foreign id
user_feed_1.remove_activity(foreign_id='tweet:1')
# Follow another feed
user_feed_1.follow('flat', '42')
# Stop following another feed
user_feed_1.unfollow('flat', '42')
# List followers/following
following = user_feed_1.following(offset=0, limit=2)
followers = user_feed_1.followers(offset=0, limit=10)
# Creates many follow relationships in one request
follows = [
{'source': 'flat:1', 'target': 'user:1'},
{'source': 'flat:1', 'target': 'user:2'},
{'source': 'flat:1', 'target': 'user:3'}
]
client.follow_many(follows)
# Batch adding activities
activities = [
{'actor': 1, 'verb': 'tweet', 'object': 1},
{'actor': 2, 'verb': 'watch', 'object': 3}
]
user_feed_1.add_activities(activities)
# Add an activity and push it to other feeds too using the `to` field
activity = {
"actor":"1",
"verb":"like",
"object":"3",
"to":["user:44", "user:45"]
}
user_feed_1.add_activity(activity)
# Retrieve an activity by its ID
client.get_activities(ids=[activity_id])
# Retrieve an activity by the combination of foreign_id and time
client.get_activities(foreign_id_times=[
(foreign_id, activity_time),
])
# Enrich while getting activities
client.get_activities(ids=[activity_id], enrich=True, reactions={"counts": True})
# Update some parts of an activity with activity_partial_update
set = {
'product.name': 'boots',
'colors': {
'red': '0xFF0000',
'green': '0x00FF00'
}
}
unset = [ 'popularity', 'details.info' ]
# ...by ID
client.activity_partial_update(id=activity_id, set=set, unset=unset)
# ...or by combination of foreign_id and time
client.activity_partial_update(foreign_id=foreign_id, time=activity_time, set=set, unset=unset)
# Generating user token for client side usage (JS client)
user_token = client.create_user_token("user-42")
I have a pool of users - some of which I don't care about -
I want to add friends - request / reply - accept friendship - then only see a subset of this.
I built on top of this many years ago - but code seems bin job.
https://github.com/ging/social_stream
Is there any kinda of helpers available that could do
"UserA, userB and 7 others liked your post."
Q) when viewing the feeds - is it a use case to then perform operations on this? Like "usera posted xyz..." and then user can like this.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.